package org.codehaus.activemq;

import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import java.io.Serializable;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.management.j2ee.statistics.Stats;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.management.JMSSessionStatsImpl;
import org.codehaus.activemq.management.StatsCapable;
import org.codehaus.activemq.message.ActiveMQBytesMessage;
import org.codehaus.activemq.message.ActiveMQDestination;
import org.codehaus.activemq.message.ActiveMQMapMessage;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.ActiveMQObjectMessage;
import org.codehaus.activemq.message.ActiveMQQueue;
import org.codehaus.activemq.message.ActiveMQStreamMessage;
import org.codehaus.activemq.message.ActiveMQTemporaryQueue;
import org.codehaus.activemq.message.ActiveMQTemporaryTopic;
import org.codehaus.activemq.message.ActiveMQTextMessage;
import org.codehaus.activemq.message.ActiveMQTopic;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.message.DurableUnsubscribe;
import org.codehaus.activemq.message.MessageAck;
import org.codehaus.activemq.message.MessageAcknowledge;
import org.codehaus.activemq.message.ProducerInfo;
import org.codehaus.activemq.message.TransactionInfo;
import org.codehaus.activemq.ra.LocalTransactionEventListener;
import org.codehaus.activemq.util.IdGenerator;

public class ActiveMQSession
  implements Session, QueueSession, TopicSession, ActiveMQMessageDispatcher, MessageAcknowledge, StatsCapable
{
  private static final Log log = LogFactory.getLog(ActiveMQSession.class);
  protected ActiveMQConnection connection;
  private int acknowledgeMode;
  protected CopyOnWriteArrayList consumers;
  protected CopyOnWriteArrayList producers;
  private IdGenerator transactionIdGenerator;
  private IdGenerator temporaryDestinationGenerator;
  protected IdGenerator packetIdGenerator;
  private IdGenerator producerIdGenerator;
  private IdGenerator consumerIdGenerator;
  private MessageListener messageListener;
  protected SynchronizedBoolean closed;
  private SynchronizedBoolean startTransaction;
  private String sessionId;
  protected String currentTransactionId;
  private long startTime;
  private Object deliveryMutex;
  private LocalTransactionEventListener localTransactionEventListener;
  private LinkedList deliveredMessages;
  private LinkedList inboundMessages;
  private JMSSessionStatsImpl stats;

  protected ActiveMQSession(ActiveMQConnection theConnection, int theAcknowledgeMode)
    throws JMSException
  {
    this.connection = theConnection;
    this.acknowledgeMode = theAcknowledgeMode;
    this.consumers = new CopyOnWriteArrayList();
    this.producers = new CopyOnWriteArrayList();
    this.producerIdGenerator = new IdGenerator();
    this.consumerIdGenerator = new IdGenerator();
    this.transactionIdGenerator = new IdGenerator();
    this.temporaryDestinationGenerator = new IdGenerator();
    this.packetIdGenerator = new IdGenerator();
    this.closed = new SynchronizedBoolean(false);
    this.startTransaction = new SynchronizedBoolean(false);
    this.sessionId = this.connection.generateSessionId();
    this.startTime = System.currentTimeMillis();
    this.deliveredMessages = new LinkedList();
    this.inboundMessages = new LinkedList();
    this.deliveryMutex = new Object();
    if (getTransacted()) {
      this.currentTransactionId = getNextTransactionId();
    }
    this.connection.addSession(this);
    this.stats = new JMSSessionStatsImpl(this.producers, this.consumers);
  }

  public Stats getStats() {
    return this.stats;
  }

  public JMSSessionStatsImpl getSessionStats() {
    return this.stats;
  }

  public BytesMessage createBytesMessage()
    throws JMSException
  {
    checkClosed();
    return new ActiveMQBytesMessage();
  }

  public MapMessage createMapMessage()
    throws JMSException
  {
    checkClosed();
    return new ActiveMQMapMessage();
  }

  public Message createMessage()
    throws JMSException
  {
    checkClosed();
    return new ActiveMQMessage();
  }

  public ObjectMessage createObjectMessage()
    throws JMSException
  {
    checkClosed();
    return new ActiveMQObjectMessage();
  }

  public ObjectMessage createObjectMessage(Serializable object)
    throws JMSException
  {
    checkClosed();
    ActiveMQObjectMessage msg = new ActiveMQObjectMessage();
    msg.setObject(object);
    return msg;
  }

  public StreamMessage createStreamMessage()
    throws JMSException
  {
    checkClosed();
    return new ActiveMQStreamMessage();
  }

  public TextMessage createTextMessage()
    throws JMSException
  {
    checkClosed();
    return new ActiveMQTextMessage();
  }

  public TextMessage createTextMessage(String text)
    throws JMSException
  {
    checkClosed();
    ActiveMQTextMessage msg = new ActiveMQTextMessage();
    msg.setText(text);
    return msg;
  }

  public boolean getTransacted()
    throws JMSException
  {
    checkClosed();
    return this.acknowledgeMode == 0;
  }

  public int getAcknowledgeMode()
    throws JMSException
  {
    checkClosed();
    return this.acknowledgeMode;
  }

  public void commit()
    throws JMSException
  {
    checkClosed();
    if (!getTransacted()) {
      throw new IllegalStateException("Not a transacted session");
    }

    if (this.startTransaction.commit(true, false)) {
      TransactionInfo info = new TransactionInfo();
      info.setId(this.packetIdGenerator.generateId());
      info.setTransactionId(this.currentTransactionId);
      info.setType(103);

      this.currentTransactionId = getNextTransactionId();

      this.connection.syncSendPacket(info);
      if (this.localTransactionEventListener != null)
        this.localTransactionEventListener.commitEvent();
    }
  }

  public void rollback()
    throws JMSException
  {
    checkClosed();
    if (!getTransacted()) {
      throw new IllegalStateException("Not a transacted session");
    }

    if (this.startTransaction.commit(true, false)) {
      TransactionInfo info = new TransactionInfo();
      info.setId(this.packetIdGenerator.generateId());
      info.setTransactionId(this.currentTransactionId);
      info.setType(105);

      this.currentTransactionId = getNextTransactionId();
      this.connection.asyncSendPacket(info);

      if (this.localTransactionEventListener != null)
        this.localTransactionEventListener.rollbackEvent();
    }
  }

  public void close()
    throws JMSException
  {
    if (!this.closed.get()) {
      for (Iterator i = this.consumers.iterator(); i.hasNext(); ) {
        ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)i.next();
        consumer.close();
      }
      for (Iterator i = this.producers.iterator(); i.hasNext(); ) {
        ActiveMQMessageProducer producer = (ActiveMQMessageProducer)i.next();
        producer.close();
      }
      this.consumers.clear();
      this.producers.clear();
      this.connection.removeSession(this);
      this.inboundMessages.clear();
      this.deliveredMessages.clear();
      this.closed.set(true);
    }
  }

  protected void checkClosed()
    throws IllegalStateException
  {
    if (this.closed.get())
      throw new IllegalStateException("The Consumer is closed");
  }

  public void recover()
    throws JMSException
  {
    checkClosed();
    if (getTransacted()) {
      throw new IllegalStateException("This session is transacted");
    }
    synchronized (this.deliveryMutex) {
      HashSet replay = new LinkedHashSet();
      replay.addAll(this.deliveredMessages);
      replay.addAll(this.inboundMessages);
      this.deliveredMessages.clear();
      this.inboundMessages.clear();
      for (Iterator i = replay.iterator(); i.hasNext(); ) {
        ActiveMQMessage msg = (ActiveMQMessage)i.next();
        this.inboundMessages.remove(msg);
        msg.setJMSRedelivered(true);
        dispatch(msg);
      }
      replay.clear();
    }
  }

  public MessageListener getMessageListener()
    throws JMSException
  {
    checkClosed();
    return this.messageListener;
  }

  public void setMessageListener(MessageListener listener)
    throws JMSException
  {
    checkClosed();
    this.messageListener = listener;
  }

  public void run()
  {
    MessageListener listener = this.messageListener;

    boolean doRemove = this.acknowledgeMode != 2;
    Iterator i;
    synchronized (this.inboundMessages) {
      for (i = this.inboundMessages.iterator(); i.hasNext(); ) {
        ActiveMQMessage message = (ActiveMQMessage)i.next();
        if (listener != null) {
          try {
            listener.onMessage(message);
            messageDelivered(true, message, true, false);
          }
          catch (Throwable t) {
            log.info("Caught :" + t, t);
            messageDelivered(true, message, false, false);
          }
        }
        else {
          messageDelivered(true, message, false, false);
        }
        if (doRemove)
          i.remove();
      }
    }
  }

  public MessageProducer createProducer(Destination destination)
    throws JMSException
  {
    checkClosed();
    return new ActiveMQMessageProducer(this, ActiveMQMessageTransformation.transformDestination(destination));
  }

  public MessageConsumer createConsumer(Destination destination)
    throws JMSException
  {
    checkClosed();
    int prefetch = (destination instanceof Topic) ? this.connection.getPrefetchPolicy().getTopicPrefetch() : this.connection.getPrefetchPolicy().getQueuePrefetch();

    return new ActiveMQMessageConsumer(this, ActiveMQMessageTransformation.transformDestination(destination), "", "", this.connection.getNextConsumerNumber(), prefetch, false, false);
  }

  public MessageConsumer createConsumer(Destination destination, String messageSelector)
    throws JMSException
  {
    checkClosed();
    int prefetch = (destination instanceof Topic) ? this.connection.getPrefetchPolicy().getTopicPrefetch() : this.connection.getPrefetchPolicy().getQueuePrefetch();

    return new ActiveMQMessageConsumer(this, ActiveMQMessageTransformation.transformDestination(destination), "", messageSelector, this.connection.getNextConsumerNumber(), prefetch, false, false);
  }

  public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean NoLocal)
    throws JMSException
  {
    checkClosed();
    int prefetch = this.connection.getPrefetchPolicy().getTopicPrefetch();
    return new ActiveMQMessageConsumer(this, ActiveMQMessageTransformation.transformDestination(destination), "", messageSelector, this.connection.getNextConsumerNumber(), prefetch, NoLocal, false);
  }

  public Queue createQueue(String queueName)
    throws JMSException
  {
    checkClosed();
    return new ActiveMQQueue(queueName);
  }

  public Topic createTopic(String topicName)
    throws JMSException
  {
    checkClosed();
    return new ActiveMQTopic(topicName);
  }

  public TopicSubscriber createDurableSubscriber(Topic topic, String name)
    throws JMSException
  {
    checkClosed();
    return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination(topic), name, "", this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy().getDurableTopicPrefetch(), false, false);
  }

  public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
    throws JMSException
  {
    checkClosed();
    return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination(topic), name, messageSelector, this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy().getDurableTopicPrefetch(), noLocal, false);
  }

  public QueueBrowser createBrowser(Queue queue)
    throws JMSException
  {
    checkClosed();
    return new ActiveMQQueueBrowser(this, ActiveMQMessageTransformation.transformDestination(queue), "", this.connection.getNextConsumerNumber());
  }

  public QueueBrowser createBrowser(Queue queue, String messageSelector)
    throws JMSException
  {
    checkClosed();
    return new ActiveMQQueueBrowser(this, ActiveMQMessageTransformation.transformDestination(queue), messageSelector, this.connection.getNextConsumerNumber());
  }

  public TemporaryQueue createTemporaryQueue()
    throws JMSException
  {
    checkClosed();
    String tempQueueName = "TemporaryQueue-" + ActiveMQDestination.createTemporaryName(this.connection.getClientID());
    tempQueueName = tempQueueName + this.temporaryDestinationGenerator.generateId();
    return new ActiveMQTemporaryQueue(tempQueueName);
  }

  public TemporaryTopic createTemporaryTopic()
    throws JMSException
  {
    checkClosed();
    String tempTopicName = "TemporaryTopic-" + ActiveMQDestination.createTemporaryName(this.connection.getClientID());
    tempTopicName = tempTopicName + this.temporaryDestinationGenerator.generateId();
    return new ActiveMQTemporaryTopic(tempTopicName);
  }

  public QueueReceiver createReceiver(Queue queue)
    throws JMSException
  {
    checkClosed();
    return new ActiveMQQueueReceiver(this, ActiveMQDestination.transformDestination(queue), "", this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy().getQueuePrefetch());
  }

  public QueueReceiver createReceiver(Queue queue, String messageSelector)
    throws JMSException
  {
    checkClosed();
    return new ActiveMQQueueReceiver(this, ActiveMQMessageTransformation.transformDestination(queue), messageSelector, this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy().getQueuePrefetch());
  }

  public QueueSender createSender(Queue queue)
    throws JMSException
  {
    checkClosed();
    return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue));
  }

  public TopicSubscriber createSubscriber(Topic topic)
    throws JMSException
  {
    checkClosed();
    return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination(topic), null, null, this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy().getTopicPrefetch(), false, false);
  }

  public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal)
    throws JMSException
  {
    checkClosed();
    return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination(topic), null, messageSelector, this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy().getTopicPrefetch(), noLocal, false);
  }

  public TopicPublisher createPublisher(Topic topic)
    throws JMSException
  {
    checkClosed();
    return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic));
  }

  public void unsubscribe(String name)
    throws JMSException
  {
    checkClosed();
    DurableUnsubscribe ds = new DurableUnsubscribe();
    ds.setId(this.packetIdGenerator.generateId());
    ds.setClientId(this.connection.getClientID());
    ds.setSubscriberName(name);
    this.connection.syncSendPacket(ds);
  }

  public boolean isTarget(ActiveMQMessage message)
  {
    for (Iterator i = this.consumers.iterator(); i.hasNext(); ) {
      ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)i.next();
      if (message.isConsumerTarget(consumer.getConsumerNumber())) {
        return true;
      }
    }
    return false;
  }

  public void dispatch(ActiveMQMessage message)
  {
    message.setMessageAcknowledge(this);
    Iterator i;
    synchronized (this.deliveryMutex) {
      this.inboundMessages.add(message);
      if (this.messageListener == null)
        for (i = this.consumers.iterator(); i.hasNext(); ) {
          ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)i.next();
          if (message.isConsumerTarget(consumer.getConsumerNumber()))
            try {
              consumer.processMessage(message.shallowCopy());
            }
            catch (JMSException e) {
              this.connection.handleAsyncException(e);
            }
        }
    }
  }

  public void acknowledge()
    throws JMSException
  {
    checkClosed();
    Iterator i;
    if (this.acknowledgeMode == 2) {
      for (i = this.deliveredMessages.iterator(); i.hasNext(); ) {
        ActiveMQMessage msg = (ActiveMQMessage)i.next();
        MessageAck ack = new MessageAck();
        ack.setConsumerId(msg.getConsumerId());
        ack.setMessageID(msg.getJMSMessageID());
        ack.setMessageRead(msg.isMessageConsumed());
        ack.setId(this.packetIdGenerator.generateId());
        this.connection.asyncSendPacket(ack);
        this.inboundMessages.remove(msg);
      }
    }
    this.deliveredMessages.clear();
  }

  protected void messageDelivered(boolean sendAcknowledge, ActiveMQMessage message, boolean messageConsumed, boolean doRemove) {
    if ((message != null) && (!this.closed.get()))
      if (this.acknowledgeMode == 2) {
        message.setMessageConsumed(messageConsumed);
        this.deliveredMessages.add(message);
      }
      else
      {
        if (sendAcknowledge) {
          try {
            doStartTransaction();
            MessageAck ack = new MessageAck();
            ack.setConsumerId(message.getConsumerId());
            ack.setTransactionId(this.currentTransactionId);
            ack.setMessageID(message.getJMSMessageID());
            ack.setMessageRead(messageConsumed);
            ack.setId(this.packetIdGenerator.generateId());
            ack.setXaTransacted(isXaTransacted());

            this.connection.asyncSendPacket(ack);
          }
          catch (JMSException e) {
            log.warn("failed to notify Broker that message is delivered", e);
          }
        }
        if (doRemove)
          this.inboundMessages.remove(message);
      }
  }

  protected void addConsumer(ActiveMQMessageConsumer consumer)
    throws JMSException
  {
    if (consumer.isDurableSubscriber()) {
      this.stats.onCreateDurableSubscriber();
    }
    consumer.setConsumerId(this.consumerIdGenerator.generateId());
    ConsumerInfo info = createConsumerInfo(consumer);
    info.setStarted(true);
    this.connection.syncSendPacket(info);
    this.consumers.add(consumer);
  }

  protected void removeConsumer(ActiveMQMessageConsumer consumer)
    throws JMSException
  {
    this.consumers.remove(consumer);

    if (consumer.isDurableSubscriber()) {
      this.stats.onRemoveDurableSubscriber();
    }
    if (!this.closed.get()) {
      ConsumerInfo info = createConsumerInfo(consumer);
      info.setStarted(false);
      this.connection.asyncSendPacket(info);
    }
  }

  protected ConsumerInfo createConsumerInfo(ActiveMQMessageConsumer consumer) throws JMSException {
    ConsumerInfo info = new ConsumerInfo();
    info.setConsumerId(consumer.consumerId);
    info.setClientId(this.connection.clientID);
    info.setSessionId(this.sessionId);
    info.setConsumerNo(consumer.consumerNumber);
    info.setPrefetchNumber(consumer.prefetchNumber);
    info.setDestination(consumer.destination);
    info.setId(this.packetIdGenerator.generateId());
    info.setNoLocal(consumer.noLocal);
    info.setBrowser(consumer.browser);
    info.setSelector(consumer.messageSelector);
    info.setStartTime(consumer.startTime);
    info.setConsumerName(consumer.consumerName);
    return info;
  }

  protected void addProducer(ActiveMQMessageProducer producer)
    throws JMSException
  {
    producer.setProducerId(this.producerIdGenerator.generateId());
    ProducerInfo info = createProducerInfo(producer);
    info.setStarted(true);
    this.connection.syncSendPacket(info);
    this.producers.add(producer);
  }

  protected void removeProducer(ActiveMQMessageProducer producer)
    throws JMSException
  {
    this.producers.remove(producer);
    if (!this.closed.get()) {
      ProducerInfo info = createProducerInfo(producer);
      info.setStarted(false);
      this.connection.asyncSendPacket(info);
    }
  }

  protected ProducerInfo createProducerInfo(ActiveMQMessageProducer producer) throws JMSException {
    ProducerInfo info = new ProducerInfo();
    info.setProducerId(producer.getProducerId());
    info.setClientId(this.connection.clientID);
    info.setSessionId(this.sessionId);
    info.setDestination(producer.defaultDestination);
    info.setId(this.packetIdGenerator.generateId());
    info.setStartTime(producer.getStartTime());
    return info;
  }

  protected void start()
  {
  }

  protected void stop()
  {
  }

  protected String getSessionId()
  {
    return this.sessionId;
  }

  protected void setSessionId(String sessionId)
  {
    this.sessionId = sessionId;
  }

  protected long getStartTime()
  {
    return this.startTime;
  }

  protected void setStartTime(long startTime)
  {
    this.startTime = startTime;
  }

  protected void send(ActiveMQMessageProducer producer, Destination destination, Message message, int deliveryMode, int priority, long timeToLive)
    throws JMSException
  {
    checkClosed();

    doStartTransaction();
    message.setJMSDestination(destination);
    message.setJMSDeliveryMode(deliveryMode);
    message.setJMSPriority(priority);
    long expiration = 0L;
    if (!producer.getDisableMessageTimestamp()) {
      long timeStamp = System.currentTimeMillis();
      message.setJMSTimestamp(timeStamp);
      if (timeToLive > 0L) {
        expiration = timeToLive + timeStamp;
      }
    }
    message.setJMSExpiration(expiration);
    if (!producer.getDisableMessageID()) {
      message.setJMSMessageID(producer.getIdGenerator().generateId());
    }

    ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message);
    msg.setProducerID(producer.getProducerId());
    msg.setTransactionId(this.currentTransactionId);
    msg.setXaTransacted(isXaTransacted());
    msg.setJMSClientID(this.connection.clientID);
    this.connection.asyncSendPacket(msg);
  }

  protected void doStartTransaction()
    throws JMSException
  {
    if ((getTransacted()) && 
      (this.startTransaction.commit(false, true))) {
      TransactionInfo info = new TransactionInfo();
      info.setId(this.packetIdGenerator.generateId());
      info.setTransactionId(this.currentTransactionId);
      info.setType(101);
      this.connection.asyncSendPacket(info);

      if (this.localTransactionEventListener != null)
        this.localTransactionEventListener.beginEvent();
    }
  }

  public LocalTransactionEventListener getLocalTransactionEventListener()
  {
    return this.localTransactionEventListener;
  }

  public void setLocalTransactionEventListener(LocalTransactionEventListener localTransactionEventListener)
  {
    this.localTransactionEventListener = localTransactionEventListener;
  }

  protected boolean isXaTransacted() {
    return false;
  }

  protected String getNextTransactionId()
  {
    return this.transactionIdGenerator.generateId();
  }

@Override
public MessageConsumer createDurableConsumer(Topic arg0, String arg1) throws JMSException {
	// TODO Auto-generated method stub
	return null;
}

@Override
public MessageConsumer createDurableConsumer(Topic arg0, String arg1, String arg2, boolean arg3) throws JMSException {
	// TODO Auto-generated method stub
	return null;
}

@Override
public MessageConsumer createSharedConsumer(Topic arg0, String arg1) throws JMSException {
	// TODO Auto-generated method stub
	return null;
}

@Override
public MessageConsumer createSharedConsumer(Topic arg0, String arg1, String arg2) throws JMSException {
	// TODO Auto-generated method stub
	return null;
}

@Override
public MessageConsumer createSharedDurableConsumer(Topic arg0, String arg1) throws JMSException {
	// TODO Auto-generated method stub
	return null;
}

@Override
public MessageConsumer createSharedDurableConsumer(Topic arg0, String arg1, String arg2) throws JMSException {
	// TODO Auto-generated method stub
	return null;
}
}